Skip to content

feat: enable pickling for Python aggregate and window UDFs#1545

Merged
timsaucer merged 3 commits into
apache:mainfrom
timsaucer:pr2-agg-window-inline
May 20, 2026
Merged

feat: enable pickling for Python aggregate and window UDFs#1545
timsaucer merged 3 commits into
apache:mainfrom
timsaucer:pr2-agg-window-inline

Conversation

@timsaucer
Copy link
Copy Markdown
Member

@timsaucer timsaucer commented May 15, 2026

Which issue does this PR close?

Addresses part of #1517

This is PR 2 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.

Follow up PRs:

Rationale for this change

PR 1 closed the round-trip encoding and decoding of scalar UDFs. The same expression problem applies to Python aggregate and window UDFs: their accumulator / partition-evaluator factory is a Python callable, so a receiver that only has the UDF name cannot reconstruct one. This PR extends the inline-encoding mechanism so the natural pickle.dumps(expr) pattern also works for expressions referencing Python UDAFs and UDWFs.

What changes are included in this PR?

Codec extension is a straight parallel of the scalar path. New wire-format families:

Kind Family magic Cloudpickle tuple shape
Agg DFPYUDA (name, accumulator_factory, input_schema_bytes, return_schema_bytes, state_schema_bytes, volatility_str)
Window DFPYUDW (name, evaluator_factory, input_schema_bytes, return_schema_bytes, volatility_str)

Are there any user-facing changes?

The MultiColumnWindowUDF rename is a Rust-side breaking change, so adding api change even though no Python-facing API breaks.

Extends the PythonLogicalCodec / PythonPhysicalCodec inline encoding
introduced for scalar UDFs to also cover Python-defined aggregate and
window UDFs. The cloudpickle tuple shape per family is:

  DFPYUDA  (agg)     (name, accumulator_factory, input_schema_bytes,
                      return_schema_bytes, state_schema_bytes,
                      volatility_str)
  DFPYUDW  (window)  (name, evaluator_factory, input_schema_bytes,
                      return_schema_bytes, volatility_str)

Same wire-framing as scalar (family magic + version byte + cloudpickle
blob), same schema serde (arrow-rs native IPC), same cached cloudpickle
handle. The agg state schema is encoded as a full IPC schema so the
post-decode UDF reports the same names + nullability + metadata as the
sender — relevant for accumulators whose StateFieldsArgs consumers key
off names rather than positional DataType.

Required restructuring two existing UDF impls so the codec can grab
the Python callable directly:

* udaf.rs: replaces create_udaf + AccumulatorFactoryFunction closure
  with a named PythonFunctionAggregateUDF that stores the Py<PyAny>
  accumulator factory. Synthesizes state_{i} field names when the
  Python constructor passes only Vec<DataType>; from_parts preserves
  the full state schema on the decode side.
* udwf.rs: renames MultiColumnWindowUDF -> PythonFunctionWindowUDF,
  drops the PartitionEvaluatorFactory PtrEq wrapper, stores the
  Py<PyAny> evaluator directly. PartialEq and Hash get the same
  pointer-identity fast path + debug-log exception handling already
  on PythonFunctionScalarUDF.

User-facing surface:

* AggregateUDF.name and WindowUDF.name properties (parallel to the
  ScalarUDF.name shipped in PR1).
* Existing UDAF/UDWF construction paths are unchanged.

The per-session with_python_udf_inlining toggle, sender-side context,
strict refusal, and user-guide docs land in PRs 3-4 of this series.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@timsaucer timsaucer force-pushed the pr2-agg-window-inline branch from 3226978 to d5bb146 Compare May 19, 2026 14:31
Re-export `to_rust_accumulator`, `to_rust_partition_evaluator`, and
`PythonFunctionWindowUDF` (with a `MultiColumnWindowUDF` alias) by
promoting `udaf` and `udwf` to `pub mod` so prior downstream Rust
consumers keep their API surface after the inline-encoding refactor.

Adds an end-to-end window UDF pickle round-trip test that runs the
decoded evaluator over a real session, mirroring the aggregate test.

Documents the cloudpickle-based shipping behavior of Python aggregate
and window UDFs in the user-guide aggregations and windows pages.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@timsaucer timsaucer changed the title feat: inline encoding for Python aggregate and window UDFs (2/4) feat: enable pickling for Python aggregate and window UDFs May 19, 2026
@timsaucer timsaucer marked this pull request as ready for review May 19, 2026 19:14
@timsaucer timsaucer requested a review from Copilot May 20, 2026 13:41
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Extends the expression serialization/pickling codec so expressions that reference Python aggregate UDFs (UDAFs) and Python window UDFs (UDWFs) can round-trip via pickle.dumps(expr) / Expr.to_bytes() by inlining the Python factory callables (cloudpickled) into the wire payload, analogous to the existing scalar UDF path.

Changes:

  • Add new codec “family” payloads for Python UDAFs (DFPYUDA) and UDWFs (DFPYUDW) and implement encode/decode for both logical + physical codecs.
  • Refactor Rust UDAF/UDWF implementations to retain the underlying Python factory callable so the codec can downcast and inline it.
  • Add/extend tests and user docs describing aggregate/window UDF serialization and behavior across processes.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
python/tests/test_pickle_expr.py Adds round-trip + execution tests for pickling/bytes serialization of Python UDAFs/UDWFs.
python/datafusion/user_defined.py Adds .name properties for AggregateUDF and WindowUDF wrappers (consistent with ScalarUDF).
python/datafusion/ipc.py Updates IPC docs to reflect inline shipping for scalar/aggregate/window Python UDFs.
python/datafusion/expr.py Updates Expr serialization/pickle documentation to include aggregate/window UDFs.
docs/source/user-guide/common-operations/windows.rst Documents how to define UDWFs and how they serialize inline.
docs/source/user-guide/common-operations/aggregations.rst Documents how to define UDAFs and how they serialize inline.
crates/core/src/udwf.rs Replaces the prior erased window-UDF impl with a Python-callable-retaining impl to support inlining; adds getter.
crates/core/src/udaf.rs Introduces a Python-callable-retaining aggregate-UDF impl to support inlining; adds getter.
crates/core/src/lib.rs Makes udaf/udwf Rust modules public.
crates/core/src/codec.rs Implements new inline wire families + encode/decode helpers for Python UDAFs/UDWFs and updates wire-header tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread python/tests/test_pickle_expr.py Outdated
Comment on lines +168 to +169
for s in states:
self._count += s[0].as_py()
Comment thread python/tests/test_pickle_expr.py Outdated
decoded = pickle.loads(pickle.dumps(e)) # noqa: S301

ctx = SessionContext()
df = ctx.from_pydict({"a": [1, 2, 3, 4, 5]})
Comment thread crates/core/src/codec.rs Outdated
Comment on lines +136 to +141
/// return type, state types schema, volatility).
pub(crate) const PY_AGG_UDF_FAMILY: &[u8] = b"DFPYUDA";

/// Family prefix for an inlined Python window UDF
/// (cloudpickled tuple of name, evaluator factory, input schema,
/// return type, volatility).
Comment thread crates/core/src/codec.rs Outdated
// Shared Python aggregate UDF encode / decode helpers
//
// Cloudpickle tuple shape: `(name, accumulator_factory, input_schema_bytes,
// return_type_bytes, state_schema_bytes, volatility_str)`. The accumulator
Comment thread crates/core/src/lib.rs
Comment on lines 61 to 67
#[allow(clippy::borrow_deref_ref)]
mod udaf;
pub mod udaf;
#[allow(clippy::borrow_deref_ref)]
mod udf;
pub mod udtf;
mod udwf;
pub mod udwf;

Copy link
Copy Markdown

@mailmindlin mailmindlin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, contingent on the copilot stuff

Comment thread crates/core/src/codec.rs Outdated
// Python callable that produces a new evaluator instance per partition.
// =============================================================================

pub(crate) fn try_encode_python_window_udf(node: &WindowUDF, buf: &mut Vec<u8>) -> Result<bool> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you call these udaf/udwf some places but all the methods are agg_udf/window_udf?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Some were based on hand written code and some the agent generated mostly from scratch. Then I didn't care enough to make the naming consistent, but I'll update it.

- Fix CountAcc.merge in pickle test: sum over states[0] (partition
  counts), not over the list of state fields. The prior implementation
  only added partition 0's count when merging across partitions.
- Drive test_agg_udf_evaluates_after_roundtrip with a two-batch
  DataFrame so merge actually runs and the round-tripped state-field
  schema is exercised end-to-end.
- Correct PY_AGG_UDF_FAMILY / PY_WINDOW_UDF_FAMILY doc comments and the
  aggregate block comment to reference "return schema bytes" rather
  than "return type" / "return_type_bytes" so the docs match the actual
  on-wire layout.
- Keep `udaf` and `udwf` modules private (matching `udf`) and
  selectively re-export the helpers downstream Rust consumers rely on
  (`to_rust_accumulator`, `to_rust_partition_evaluator`,
  `PythonFunctionWindowUDF`, `MultiColumnWindowUDF`) instead of
  exposing the whole module surface.
- Rename codec helpers `*_agg_udf` -> `*_udaf` and `*_window_udf` ->
  `*_udwf` for naming consistency with the Python public aliases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@timsaucer timsaucer merged commit dac9ec6 into apache:main May 20, 2026
21 checks passed
@timsaucer timsaucer deleted the pr2-agg-window-inline branch May 20, 2026 23:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants